Spark 3.0 自适应查询执行

1 背景

  • 深入研究的数据库文献
  • Spark1.6概念引入
  • Intel原型和实验的New AQE
  • Databricks和Intel协作的Spark 3.0中的New AQE

2 定义与动机

(1) 定义

基于运行时统计的,查询执行时的,动态查询优化。

(2) 动机

Spark 2.2引入CBO,通过输入大小和基数估计选择最佳的执行计划。

基于成本的优化CBO经常受限:

  • 脏的或缺失的统计数据导致不准确的估计
  • 统计数据收集成本高昂,如列直方图
  • 估算包含UDF
  • 提示Hint对于快速变化的数据无效

AQE的所有优化基于准确的运行时统计信息。

AQE优化发生在阶段间隙,此时数据容量、分区大小等统计信息可用。

3 框架与工作流

(1) 过程

  • 运行leaf stages
  • 当stage结束,根据最新统计信息优化
  • 运行更多满足依赖关系的stage
  • 重复2、3步骤直到没有更多的stage运行

(2) 主要特性

  • 动态shuffle分区
  • 动态转换关联策略
  • 动态优化倾斜关联

(3) 动态Shuffle分区

1) 背景

AQE关闭时,分区数是固定的,但是在查询执行时数据量确实变化的。

过大的分区导致GC压力和磁盘溢出,过多的分区导致低效I/O、调度和任务创建负载过高

2) AQE方案

  • 初始时设置较到的分区数据,以处理较大的分区
  • 在每个stage后自动调整分区数量,如合并相邻分区

截屏2020-12-31 上午9.32.06

注意:上图数据流自下而上

(4) 动态转换关联策略

1) 背景

当关联的其中一个表可以放入内存时,Spark选择Broadcast Hash Join。

但是由于估计依赖于统计值。在AQE之前,统计值不能准确评估基数或选择性估计;子关系是复杂的操作树;存在UDF等黑箱预测等。

2) AQE方案

使用运行时的数据大小重新规划关联。

截屏2020-12-31 上午9.45.54

注意:上图中根据运行时的数据大小数据,将SORT MERGE JOIN转换为BROADCAST HASH JOIN

(5) 动态优化倾斜关联

1) 背景

数据倾斜导致性能丢失,延长了作业执行时间,尤其是大分区导致的数据溢出。

2) AQE方案

使用运行时统计信息自动处理数据倾斜

  • 从分区大小检测倾斜
  • 拆分倾斜分区为更小的分区

17:14

参考资料

Adaptive Query Execution: Speeding Up Spark SQL at Runtime